From b6b8ebe122ce1b48628a1aeb3f1e60542402033a Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Sat, 9 Sep 2023 22:51:14 -0700 Subject: [PATCH 1/9] [YAML] Implement basic java mapping operations. --- .../JavaMapToFieldsTransformProvider.java | 251 ++++++++++++++ .../transforms/providers/JavaRowUdf.java | 321 ++++++++++++++++++ .../transforms/providers/StringCompiler.java | 212 ++++++++++++ .../JavaMapToFieldsTransformProviderTest.java | 194 +++++++++++ .../transforms/providers/JavaRowUdfTest.java | 158 +++++++++ .../providers/StringCompilerTest.java | 70 ++++ 6 files changed, 1206 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java new file mode 100644 index 000000000000..7bfdcd5ccfb4 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.net.MalformedURLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; + +/** + * An implementation of {@link TypedSchemaTransformProvider} for MapToFields for the java language. + * + *

Internal only: This class is actively being worked on, and it will likely change. We + * provide no backwards compatibility guarantees, and it should not be implemented outside the Beam + * repository. + */ +@SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) +}) +@AutoService(SchemaTransformProvider.class) +public class JavaMapToFieldsTransformProvider + extends TypedSchemaTransformProvider { + protected static final String INPUT_ROWS_TAG = "input"; + protected static final String OUTPUT_ROWS_TAG = "output"; + + @Override + protected Class configurationClass() { + return Configuration.class; + } + + @Override + protected SchemaTransform from(Configuration configuration) { + return new JavaMapToFieldsTransform(configuration); + } + + @Override + public String identifier() { + return String.format("beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1"); + } + + @Override + public List inputCollectionNames() { + return Collections.singletonList(INPUT_ROWS_TAG); + } + + @Override + public List outputCollectionNames() { + return Collections.singletonList(OUTPUT_ROWS_TAG); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration { + @Nullable + public abstract String getLanguage(); + + @Nullable + public abstract Boolean getAppend(); + + @Nullable + public abstract List getDrop(); + + public abstract Map getFields(); + + @Nullable + public abstract ErrorHandling getError_handling(); + + public static Builder builder() { + return new AutoValue_JavaMapToFieldsTransformProvider_Configuration.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setLanguage(String language); + + public abstract Builder setAppend(Boolean append); + + public abstract Builder setDrop(List drop); + + public abstract Builder setFields(Map fields); + + public abstract Builder setError_handling(ErrorHandling error_handling); + + public abstract Configuration build(); + } + + @AutoValue + public abstract static class ErrorHandling { + @SchemaFieldDescription("The name of the output PCollection containing failed writes.") + public abstract String getOutput(); + + public static Builder builder() { + return new AutoValue_JavaMapToFieldsTransformProvider_Configuration_ErrorHandling.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setOutput(String output); + + public abstract ErrorHandling build(); + } + } + } + + /** A {@link SchemaTransform} for MapToFields-java. */ + protected static class JavaMapToFieldsTransform extends SchemaTransform { + + private final Configuration configuration; + + JavaMapToFieldsTransform(Configuration configuration) { + this.configuration = configuration; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema inputSchema = input.get(INPUT_ROWS_TAG).getSchema(); + Schema.Builder outputSchemaBuilder = new Schema.Builder(); + // TODO(yaml): Consider allowing the full java schema naming syntax + // (perhaps as a different dialect/language). + boolean append = configuration.getAppend() != null && configuration.getAppend(); + List toDrop = + configuration.getDrop() == null ? Collections.emptyList() : configuration.getDrop(); + List udfs = new ArrayList<>(); + if (append) { + for (Schema.Field field : inputSchema.getFields()) { + if (!toDrop.contains(field.getName())) { + try { + udfs.add( + new JavaRowUdf( + JavaRowUdf.Configuration.builder().setExpression(field.getName()).build(), + inputSchema)); + } catch (MalformedURLException + | ReflectiveOperationException + | StringCompiler.CompileException exn) { + throw new RuntimeException(exn); + } + outputSchemaBuilder = outputSchemaBuilder.addField(field); + } + } + } + for (Map.Entry entry : + configuration.getFields().entrySet()) { + if ("generic".equals(configuration.getLanguage())) { + String expr = entry.getValue().getExpression(); + if (expr == null || !inputSchema.hasField(expr)) { + throw new IllegalArgumentException( + "Unknown field or missing language specification for '" + entry.getKey() + "'"); + } + } + try { + JavaRowUdf udf = new JavaRowUdf(entry.getValue(), inputSchema); + udfs.add(udf); + outputSchemaBuilder = outputSchemaBuilder.addField(entry.getKey(), udf.getOutputType()); + } catch (MalformedURLException + | ReflectiveOperationException + | StringCompiler.CompileException exn) { + throw new RuntimeException(exn); + } + } + Schema outputSchema = outputSchemaBuilder.build(); + boolean handleErrors = + configuration.getError_handling() != null + && configuration.getError_handling().getOutput() != null; + Schema errorSchema = + Schema.of( + Schema.Field.of("failed_row", Schema.FieldType.row(inputSchema)), + Schema.Field.of("error_message", Schema.FieldType.STRING)); + + PCollectionTuple pcolls = + input + .get(INPUT_ROWS_TAG) + .apply( + "MapToFields", + ParDo.of(createDoFn(udfs, outputSchema, errorSchema, handleErrors)) + .withOutputTags(mappedValues, TupleTagList.of(errorValues))); + pcolls.get(mappedValues).setRowSchema(outputSchema); + pcolls.get(errorValues).setRowSchema(errorSchema); + + PCollectionRowTuple result = + PCollectionRowTuple.of(OUTPUT_ROWS_TAG, pcolls.get(mappedValues)); + if (handleErrors) { + result = result.and(configuration.getError_handling().getOutput(), pcolls.get(errorValues)); + } + return result; + } + + private static final TupleTag mappedValues = new TupleTag() {}; + private static final TupleTag errorValues = new TupleTag() {}; + + private static DoFn createDoFn( + List udfs, Schema outputSchema, Schema errorSchema, boolean handleErrors) { + return new DoFn() { + @ProcessElement + public void processElement(@Element Row inputRow, MultiOutputReceiver out) { + try { + Row.Builder outputRow = Row.withSchema(outputSchema); + for (JavaRowUdf udf : udfs) { + outputRow.addValue(udf.getFunction().apply(inputRow)); + } + out.get(mappedValues).output(outputRow.build()); + } catch (Exception exn) { + if (handleErrors) { + out.get(errorValues) + .output( + Row.withSchema(errorSchema) + .withFieldValue("failed_row", inputRow) + .withFieldValue("error_message", exn.getMessage()) + .build()); + } else { + throw new RuntimeException(exn); + } + } + } + }; + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java new file mode 100644 index 000000000000..8dbe0f21fd9f --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Modifier; +import java.lang.reflect.Type; +import java.math.BigDecimal; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.FieldValueTypeInformation; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.utils.StaticSchemaInference; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.sdk.values.TypeDescriptors; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +public class JavaRowUdf implements Serializable { + private final Configuration config; + private final Schema inputSchema; + private final Schema.FieldType outputType; + + // Transient so we don't have to worry about issues serializing these dynamically created classes. + // While this is lazily computed, it is always computed on class construction, so any errors + // should still be caught at construction time. + private transient Function function; + + // Find or implement the inverse of StaticSchemaInference.fieldFromType + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Configuration implements Serializable { + @SchemaFieldDescription("Source code of a java expression in terms of the schema fields.") + @Nullable + public abstract String getExpression(); + + @SchemaFieldDescription( + "Source code of a public class implementing Function for some schema-compatible T.") + @Nullable + public abstract String getCallable(); + + @SchemaFieldDescription("Path to a jar file implementing the function referenced in name.") + @Nullable + public abstract String getPath(); + + @SchemaFieldDescription( + "Fully qualified name of either a class implementing Function (e.g. com.pkg.MyFunction), " + + "or a method taking a single Row argument (e.g. com.pkg.MyClass::methodName). " + + "If a method is passed, it must either be static or belong to a class with a public nullary constructor.") + @Nullable + public abstract String getName(); + + public void validate() { + checkArgument( + Strings.isNullOrEmpty(getPath()) || !Strings.isNullOrEmpty(getName()), + "Specifying a path only allows if a name is provided."); + int totalArgs = + (Strings.isNullOrEmpty(getExpression()) ? 0 : 1) + + (Strings.isNullOrEmpty(getCallable()) ? 0 : 1) + + (Strings.isNullOrEmpty(getName()) ? 0 : 1); + checkArgument( + totalArgs == 1, "Exactly one of expression, callable, or name must be provided."); + } + + public static Configuration.Builder builder() { + return new AutoValue_JavaRowUdf_Configuration.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + public abstract Configuration.Builder setExpression(String expression); + + public abstract Configuration.Builder setCallable(String callable); + + public abstract Configuration.Builder setPath(String path); + + public abstract Configuration.Builder setName(String name); + + public abstract Configuration build(); + } + } + + public JavaRowUdf(Configuration config, Schema inputSchema) + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + this.config = config; + this.inputSchema = inputSchema; + FunctionAndType functionAndType = createFunction(config, inputSchema); + this.outputType = functionAndType.outputType; + this.function = functionAndType.function; + } + + public Schema.FieldType getOutputType() { + return outputType; + } + + public Function getFunction() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + if (function == null) { + FunctionAndType functionAndType = createFunction(config, inputSchema); + assert functionAndType.outputType.equals(outputType); + function = functionAndType.function; + } + return function; + } + + private static class FunctionAndType { + public final Schema.FieldType outputType; + public final Function function; + + public FunctionAndType(Function function) { + this(outputOf(function), function); + } + + public FunctionAndType(Type outputType, Function function) { + this(TypeDescriptor.of(outputType), function); + } + + public FunctionAndType(TypeDescriptor outputType, Function function) { + this( + StaticSchemaInference.fieldFromType(outputType, new EmptyFieldValueTypeSupplier()), + function); + } + + public FunctionAndType(Schema.FieldType outputType, Function function) { + this.outputType = outputType; + this.function = function; + } + + public static TypeDescriptor outputOf(Function fn) { + return TypeDescriptors.extractFromTypeParameters( + fn, + Function.class, + new TypeDescriptors.TypeVariableExtractor, OutputT>() {}); + } + } + + @SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) + }) + private static FunctionAndType createFunction(Configuration config, Schema inputSchema) + throws ReflectiveOperationException, StringCompiler.CompileException, MalformedURLException { + config.validate(); + if (!Strings.isNullOrEmpty(config.getExpression())) { + return createFunctionFromExpression(config.getExpression(), inputSchema); + } else if (!Strings.isNullOrEmpty(config.getCallable())) { + return createFuctionFromCallable(config.getCallable()); + } else if (!Strings.isNullOrEmpty(config.getName())) { + return createFunctionFromName(config.getName(), config.getPath()); + } else { + throw new UnsupportedOperationException(config.toString()); + } + } + + private static FunctionAndType createFunctionFromExpression(String expression, Schema inputSchema) + throws StringCompiler.CompileException, ReflectiveOperationException { + if (inputSchema.hasField(expression)) { + final int ix = inputSchema.indexOf(expression); + return new FunctionAndType( + inputSchema.getField(expression).getType(), (Row row) -> row.getValue(ix)); + } else { + Map fieldTypes = new HashMap<>(); + for (Schema.Field field : inputSchema.getFields()) { + if (expression.indexOf(field.getName()) != -1) { + fieldTypes.put(field.getName(), typeFromFieldType(field.getType())); + } + } + Type type = StringCompiler.guessExpressionType(expression, fieldTypes); + StringBuilder source = new StringBuilder(); + source.append("import java.util.function.Function;\n"); + source.append("import " + Row.class.getTypeName() + ";\n"); + source.append("public class Eval implements Function {\n"); + source.append(" public Object apply(Row __row__) {\n"); + for (Map.Entry fieldEntry : fieldTypes.entrySet()) { + source.append( + String.format( + " %s %s = (%s) __row__.getValue(%s);\n", + fieldEntry.getValue().getTypeName(), + fieldEntry.getKey(), + fieldEntry.getValue().getTypeName(), + inputSchema.indexOf(fieldEntry.getKey()))); + } + source.append(" return " + expression + ";\n"); + source.append(" }\n"); + source.append("}\n"); + return new FunctionAndType( + type, (Function) StringCompiler.getInstance("Eval", source.toString())); + } + } + + @SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) + }) + private static FunctionAndType createFuctionFromCallable(String callable) + throws StringCompiler.CompileException, ReflectiveOperationException { + Matcher matcher = + Pattern.compile("\\bpublic\\s+class\\s+(\\S+)", Pattern.MULTILINE).matcher(callable); + Preconditions.checkArgument(matcher.find(), "No public class defined in callable source."); + return new FunctionAndType( + (Function) StringCompiler.getInstance(matcher.group(1), callable.toString())); + } + + @SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) + }) + private static FunctionAndType createFunctionFromName(String name, String path) + throws ReflectiveOperationException, MalformedURLException { + ClassLoader classLoader = + path == null + ? ClassLoader.getSystemClassLoader() + : new URLClassLoader( + new URL[] {new URL("file://" + path)}, ClassLoader.getSystemClassLoader()); + String className, methodName = null; + if (name.indexOf("::") == -1) { + className = name; + methodName = null; + } else { + String[] parts = name.split("::", 2); + className = parts[0]; + methodName = parts[1]; + } + if (methodName == null) { + return new FunctionAndType( + (Function) + classLoader.loadClass(className).getDeclaredConstructor().newInstance()); + } else { + Class clazz = classLoader.loadClass(className); + Method method = clazz.getMethod(methodName, Row.class); + Object base = + Modifier.isStatic(method.getModifiers()) + ? null + : clazz.getDeclaredConstructor().newInstance(); + return new FunctionAndType( + method.getGenericReturnType(), + (Row row) -> { + try { + return method.invoke(base, row); + } catch (IllegalAccessException | InvocationTargetException exn) { + throw new RuntimeException(exn); + } + }); + } + } + + private static class EmptyFieldValueTypeSupplier + implements org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier { + @Override + public List get(Class clazz) { + return Collections.emptyList(); + } + } + + private static final Map NULLABLE_PRIMITIVES = + ImmutableMap.builder() + .put(Schema.TypeName.BYTE, Byte.class) + .put(Schema.TypeName.INT16, Short.class) + .put(Schema.TypeName.INT32, Integer.class) + .put(Schema.TypeName.INT64, Long.class) + .put(Schema.TypeName.FLOAT, Float.class) + .put(Schema.TypeName.DOUBLE, Double.class) + .put(Schema.TypeName.BOOLEAN, Boolean.class) + .put(Schema.TypeName.DECIMAL, BigDecimal.class) + .build(); + + private static final Map NON_NULLABLE_PRIMITIVES = + ImmutableMap.builder() + .put(Schema.TypeName.BYTE, byte.class) + .put(Schema.TypeName.INT16, short.class) + .put(Schema.TypeName.INT32, int.class) + .put(Schema.TypeName.INT64, long.class) + .put(Schema.TypeName.FLOAT, float.class) + .put(Schema.TypeName.DOUBLE, double.class) + .put(Schema.TypeName.BOOLEAN, boolean.class) + .put(Schema.TypeName.DECIMAL, BigDecimal.class) + .build(); + + private static Type typeFromFieldType(Schema.FieldType fieldType) { + Map primitivesMap = + fieldType.getNullable() ? NULLABLE_PRIMITIVES : NON_NULLABLE_PRIMITIVES; + if (primitivesMap.containsKey(fieldType.getTypeName())) { + return primitivesMap.get(fieldType.getTypeName()); + } else if (fieldType.getRowSchema() != null) { + return Row.class; + } else { + throw new UnsupportedOperationException(fieldType.toString()); + } + } +} diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java new file mode 100644 index 000000000000..f568ced53167 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import java.io.ByteArrayOutputStream; +import java.io.OutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.net.URI; +import java.security.SecureClassLoader; +import java.util.Collections; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import javax.tools.Diagnostic; +import javax.tools.DiagnosticCollector; +import javax.tools.FileObject; +import javax.tools.ForwardingJavaFileManager; +import javax.tools.JavaCompiler; +import javax.tools.JavaFileObject; +import javax.tools.SimpleJavaFileObject; +import javax.tools.StandardJavaFileManager; +import javax.tools.ToolProvider; + +public class StringCompiler { + public static class CompileException extends Exception { + private final DiagnosticCollector diagnostics; + + public CompileException(DiagnosticCollector diagnostics) { + super(diagnostics.getDiagnostics().toString()); + this.diagnostics = diagnostics; + } + + public DiagnosticCollector getDiagnostics() { + return diagnostics; + } + } + + // TODO(XXX): swap args? + public static Class getClass(String name, String source) + throws CompileException, ClassNotFoundException { + JavaCompiler compiler = ToolProvider.getSystemJavaCompiler(); + InMemoryFileManager fileManager = + new InMemoryFileManager(compiler.getStandardFileManager(null, null, null)); + DiagnosticCollector diagnostics = new DiagnosticCollector<>(); + JavaCompiler.CompilationTask task = + compiler.getTask( + null, + fileManager, + diagnostics, + null, + null, + Collections.singletonList(new InMemoryFileManager.InputJavaFileObject(name, source))); + boolean result = task.call(); + if (!result) { + throw new CompileException(diagnostics); + } else { + return (Class) fileManager.getClassLoader().loadClass(name); + } + } + + public static Object getInstance(String name, String source) + throws CompileException, ReflectiveOperationException { + return getClass(name, source).getDeclaredConstructor().newInstance(); + } + + public static Type guessExpressionType(String expression, Map inputTypes) + throws StringCompiler.CompileException, ClassNotFoundException { + + String expectedError = "cannot be converted to __TypeGuesserHelper__.BadReturnType"; + + try { + StringCompiler.getClass( + "__TypeGuesserHelper__", typeGuesserSource(expression, inputTypes, "BadReturnType")); + // Must have returned null. + return Void.class; + } catch (StringCompiler.CompileException exn) { + // Use the error message to derive the actual type. + for (Diagnostic d : exn.getDiagnostics().getDiagnostics()) { + String msg = d.getMessage(Locale.ROOT); + int expectedErrorIndex = msg.indexOf(expectedError); + if (expectedErrorIndex != -1) { + String typeSource = + msg.substring( + 1 + "incompatible types: ".length() + msg.lastIndexOf('\n', expectedErrorIndex), + expectedErrorIndex); + Class clazz = + StringCompiler.getClass( + "__TypeGuesserHelper__", typeGuesserSource(expression, inputTypes, typeSource)); + for (Method method : clazz.getMethods()) { + if (method.getName().equals("method")) { + return method.getGenericReturnType(); + } + } + // We should never get here. + throw new RuntimeException("Unable to locate declared method."); + } + } + // Must have been some other error. + throw exn; + } + } + + private static String typeGuesserSource( + String expression, Map inputTypes, String returnType) { + StringBuilder source = new StringBuilder(); + source.append("class __TypeGuesserHelper__ {\n"); + source.append(" private static class BadReturnType { private BadReturnType() {} }\n"); + source.append(" public static " + returnType + " method(\n"); + boolean first = true; + for (Map.Entry arg : inputTypes.entrySet()) { + if (first) { + first = false; + } else { + source.append(", "); + } + source.append(arg.getValue().getTypeName() + " " + arg.getKey()); + } + source.append(" ) {\n"); + source.append(" return " + expression + ";\n"); + source.append(" }\n"); + source.append("}\n"); + return source.toString(); + } + + private static class InMemoryFileManager + extends ForwardingJavaFileManager { + + private Map outputFileObjects = new HashMap<>(); + + public InMemoryFileManager(StandardJavaFileManager standardManager) { + super(standardManager); + } + + @Override + public JavaFileObject getJavaFileForOutput( + Location location, String className, JavaFileObject.Kind kind, FileObject sibling) { + + OutputJavaFileObject classAsBytes = new OutputJavaFileObject(className, kind); + outputFileObjects.put(className, classAsBytes); + return classAsBytes; + } + + public ClassLoader getClassLoader() { + return new SecureClassLoader() { + @Override + protected Class findClass(String name) throws ClassNotFoundException { + OutputJavaFileObject fileObject = outputFileObjects.get(name); + if (fileObject == null) { + throw new ClassNotFoundException(name); + } else { + byte[] classBytes = fileObject.getBytes(); + return defineClass(name, classBytes, 0, classBytes.length); + } + } + }; + } + + @Override + public ClassLoader getClassLoader(Location location) { + return getClassLoader(); + } + + private static class InputJavaFileObject extends SimpleJavaFileObject { + private String source; + + public InputJavaFileObject(String name, String source) { + super( + URI.create("input:///" + name.replace('.', '/') + Kind.SOURCE.extension), Kind.SOURCE); + this.source = source; + } + + @Override + public CharSequence getCharContent(boolean ignoreEncodingErrors) { + return source; + } + } + + private static class OutputJavaFileObject extends SimpleJavaFileObject { + + private ByteArrayOutputStream content = new ByteArrayOutputStream(); + + public OutputJavaFileObject(String name, Kind kind) { + super(URI.create("output:///" + name.replace('.', '/') + kind.extension), kind); + } + + public byte[] getBytes() { + return content.toByteArray(); + } + + @Override + public OutputStream openOutputStream() { + return content; + } + } + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java new file mode 100644 index 000000000000..f90950407c20 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java @@ -0,0 +1,194 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import java.util.Collections; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for {@link JavaMapToFieldsTransformProvider}. */ +@RunWith(JUnit4.class) +public class JavaMapToFieldsTransformProviderTest { + @Rule public TestPipeline pipeline = TestPipeline.create(); + + @Test + @Category(NeedsRunner.class) + public void testRenameFields() { + Schema inputSchema = + Schema.of( + Schema.Field.of("a", Schema.FieldType.STRING), + Schema.Field.of("b", Schema.FieldType.INT32), + Schema.Field.of("c", Schema.FieldType.DOUBLE)); + + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues("foo", 2, 0.5).build(), + Row.withSchema(inputSchema).addValues("bar", 4, 0.25).build())) + .setRowSchema(inputSchema); + + PCollection renamed = + PCollectionRowTuple.of(JavaMapToFieldsTransformProvider.INPUT_ROWS_TAG, input) + .apply( + new JavaMapToFieldsTransformProvider() + .from( + JavaMapToFieldsTransformProvider.Configuration.builder() + .setFields( + ImmutableMap.of( + "newC", + JavaRowUdf.Configuration.builder().setExpression("c").build(), + "newA", + JavaRowUdf.Configuration.builder().setExpression("a").build())) + .build())) + .get(JavaMapToFieldsTransformProvider.OUTPUT_ROWS_TAG); + + Schema outputSchema = renamed.getSchema(); + + PAssert.that(renamed) + .containsInAnyOrder( + Row.withSchema(outputSchema) + .withFieldValue("newC", 0.5) + .withFieldValue("newA", "foo") + .build(), + Row.withSchema(outputSchema) + .withFieldValue("newC", 0.25) + .withFieldValue("newA", "bar") + .build()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testAppendAndDropFields() { + Schema inputSchema = + Schema.of( + Schema.Field.of("a", Schema.FieldType.INT32), + Schema.Field.of("b", Schema.FieldType.DOUBLE)); + + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues(2, 0.5).build(), + Row.withSchema(inputSchema).addValues(4, 0.25).build())) + .setRowSchema(inputSchema); + + PCollection renamed = + PCollectionRowTuple.of(JavaMapToFieldsTransformProvider.INPUT_ROWS_TAG, input) + .apply( + new JavaMapToFieldsTransformProvider() + .from( + JavaMapToFieldsTransformProvider.Configuration.builder() + .setAppend(true) + .setDrop(Collections.singletonList("b")) + .setFields( + ImmutableMap.of( + "sum", + JavaRowUdf.Configuration.builder() + .setExpression("a+b") + .build())) + .build())) + .get(JavaMapToFieldsTransformProvider.OUTPUT_ROWS_TAG); + + Schema outputSchema = renamed.getSchema(); + + PAssert.that(renamed) + .containsInAnyOrder( + Row.withSchema(outputSchema).withFieldValue("a", 2).withFieldValue("sum", 2.5).build(), + Row.withSchema(outputSchema) + .withFieldValue("a", 4) + .withFieldValue("sum", 4.25) + .build()); + + pipeline.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testErrorHandling() { + Schema inputSchema = Schema.of(Schema.Field.of("x", Schema.FieldType.INT32)); + + PCollection input = + pipeline + .apply( + Create.of( + Row.withSchema(inputSchema).addValues(4).build(), + Row.withSchema(inputSchema).addValues(-1).build())) + .setRowSchema(inputSchema); + + PCollectionRowTuple result = + PCollectionRowTuple.of(JavaMapToFieldsTransformProvider.INPUT_ROWS_TAG, input) + .apply( + new JavaMapToFieldsTransformProvider() + .from( + JavaMapToFieldsTransformProvider.Configuration.builder() + .setFields( + ImmutableMap.of( + "sqrt", + JavaRowUdf.Configuration.builder() + .setCallable( + "import java.util.function.Function;" + + "import org.apache.beam.sdk.values.Row;" + + "public class Sqrt implements Function {" + + " public Double apply(Row row) {" + + " int x = row.getInt32(\"x\");" + + " if (x < 0) {" + + " throw new ArithmeticException(\"negative value\");" + + " } else {" + + " return Math.sqrt(x);" + + " }" + + " }" + + "}") + .build())) + .setError_handling( + JavaMapToFieldsTransformProvider.Configuration.ErrorHandling + .builder() + .setOutput("errors") + .build()) + .build())); + + PCollection sqrts = result.get(JavaMapToFieldsTransformProvider.OUTPUT_ROWS_TAG); + Schema outputSchema = sqrts.getSchema(); + PAssert.that(sqrts) + .containsInAnyOrder(Row.withSchema(outputSchema).withFieldValue("sqrt", 2.0).build()); + + PCollection errors = result.get("errors"); + Schema errorSchema = errors.getSchema(); + PAssert.that(errors) + .containsInAnyOrder( + Row.withSchema(errorSchema) + .withFieldValue("failed_row", Row.withSchema(inputSchema).addValues(-1).build()) + .withFieldValue("error_message", "negative value") + .build()); + pipeline.run(); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java new file mode 100644 index 000000000000..68cbbc8b1059 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import static org.junit.Assert.*; + +import java.net.MalformedURLException; +import java.util.function.Function; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; + +public class JavaRowUdfTest { + + public final Schema TEST_SCHEMA = + Schema.of( + Schema.Field.of("anInt32", Schema.FieldType.INT32).withNullable(true), + Schema.Field.of("anInt64", Schema.FieldType.INT64).withNullable(true), + Schema.Field.of("aDouble", Schema.FieldType.DOUBLE).withNullable(true)); + + @Test + public void testExpressionUdf() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + JavaRowUdf udf = + new JavaRowUdf( + JavaRowUdf.Configuration.builder().setExpression("anInt32 + anInt64").build(), + TEST_SCHEMA); + assertEquals(Schema.FieldType.INT64, udf.getOutputType()); + assertEquals( + 5L, + udf.getFunction() + .apply( + Row.withSchema(TEST_SCHEMA) + .withFieldValue("anInt32", 2) + .withFieldValue("anInt64", 3L) + .build())); + } + + @Test + public void testFieldNameExpressionUdf() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + JavaRowUdf udf = + new JavaRowUdf( + JavaRowUdf.Configuration.builder().setExpression("anInt32").build(), TEST_SCHEMA); + assertEquals(Schema.FieldType.INT32.withNullable(true), udf.getOutputType()); + assertEquals( + 2, + udf.getFunction() + .apply( + Row.withSchema(TEST_SCHEMA) + .withFieldValue("anInt32", 2) + .withFieldValue("anInt64", 3L) + .build())); + } + + @Test + public void testCallableUdf() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + JavaRowUdf udf = + new JavaRowUdf( + JavaRowUdf.Configuration.builder() + .setCallable( + String.join( + "\n", + "import org.apache.beam.sdk.values.Row;", + "import java.util.function.Function;", + "public class MyFunction implements Function {", + " public Double apply(Row row) { return 1.0 / row.getDouble(\"aDouble\"); }", + "}")) + .build(), + TEST_SCHEMA); + assertEquals(Schema.FieldType.DOUBLE, udf.getOutputType()); + assertEquals( + 0.25, + udf.getFunction() + .apply(Row.withSchema(TEST_SCHEMA).withFieldValue("aDouble", 4.0).build())); + } + + public static class TestFunction implements Function { + @Override + public Double apply(Row row) { + return 1.0 / row.getDouble("aDouble"); + } + } + + public static double staticTestMethod(Row row) { + return 1.0 / row.getDouble("aDouble"); + } + + public static class TestClassWithMethod { + public double testMethod(Row row) { + return 1.0 / row.getDouble("aDouble"); + } + } + + @Test + public void testNamedFunctionUdf() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + JavaRowUdf udf = + new JavaRowUdf( + JavaRowUdf.Configuration.builder() + .setName(getClass().getTypeName() + "$TestFunction") + .build(), + TEST_SCHEMA); + assertEquals(Schema.FieldType.DOUBLE, udf.getOutputType()); + assertEquals( + 0.25, + udf.getFunction() + .apply(Row.withSchema(TEST_SCHEMA).withFieldValue("aDouble", 4.0).build())); + } + + @Test + public void testClassMethodUdf() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + JavaRowUdf udf = + new JavaRowUdf( + JavaRowUdf.Configuration.builder() + .setName(getClass().getTypeName() + "$TestClassWithMethod::testMethod") + .build(), + TEST_SCHEMA); + assertEquals(Schema.FieldType.DOUBLE, udf.getOutputType()); + assertEquals( + 0.25, + udf.getFunction() + .apply(Row.withSchema(TEST_SCHEMA).withFieldValue("aDouble", 4.0).build())); + } + + @Test + public void testStaticMethodUdf() + throws MalformedURLException, ReflectiveOperationException, StringCompiler.CompileException { + JavaRowUdf udf = + new JavaRowUdf( + JavaRowUdf.Configuration.builder() + .setName(getClass().getTypeName() + "::staticTestMethod") + .build(), + TEST_SCHEMA); + assertEquals(Schema.FieldType.DOUBLE, udf.getOutputType()); + assertEquals( + 0.25, + udf.getFunction() + .apply(Row.withSchema(TEST_SCHEMA).withFieldValue("aDouble", 4.0).build())); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java new file mode 100644 index 000000000000..322bd276bb14 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.transforms.providers; + +import static org.junit.Assert.*; + +import java.util.function.Function; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public class StringCompilerTest { + + public static final String SQUARE_SOURCE = + "import java.util.function.Function;" + + "public class Square implements Function {" + + " public Integer apply(Integer x) { return x * x; }" + + "}"; + + @Test + public void testGetClass() throws Exception { + Class clazz = StringCompiler.getClass("Square", SQUARE_SOURCE); + assertTrue(Function.class.isAssignableFrom(clazz)); + assertEquals("Square", clazz.getSimpleName()); + } + + @Test + public void testGetInstance() throws Exception { + Function square = + (Function) StringCompiler.getInstance("Square", SQUARE_SOURCE); + assertEquals(4, (int) square.apply(2)); + } + + @Test + public void testGuessExpressionType() throws Exception { + assertEquals( + double.class, + StringCompiler.guessExpressionType( + "a+b", ImmutableMap.of("a", int.class, "b", double.class))); + assertEquals( + double.class, + StringCompiler.guessExpressionType( + "a > 0 ? a : b", ImmutableMap.of("a", int.class, "b", double.class))); + assertEquals( + double.class, + StringCompiler.guessExpressionType("a * Math.random()", ImmutableMap.of("a", int.class))); + assertEquals( + int.class, + StringCompiler.guessExpressionType("(int) a", ImmutableMap.of("a", double.class))); + assertEquals( + long.class, + StringCompiler.guessExpressionType( + "a.getInt64(\"foo\")+b", ImmutableMap.of("a", Row.class, "b", int.class))); + } +} From 9aa3bb8d1e6b45e7abd2afb2e06aba69207536e1 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 28 Sep 2023 12:54:03 -0700 Subject: [PATCH 2/9] Workaround javac classpath issue. --- .../JavaMapToFieldsTransformProvider.java | 2 +- .../transforms/providers/StringCompiler.java | 43 ++++++++++++++++++- 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java index 7bfdcd5ccfb4..b8bdfc56a533 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java @@ -174,7 +174,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } for (Map.Entry entry : configuration.getFields().entrySet()) { - if ("generic".equals(configuration.getLanguage())) { + if (!"java".equals(configuration.getLanguage())) { String expr = entry.getValue().getExpression(); if (expr == null || !inputSchema.hasField(expr)) { throw new IllegalArgumentException( diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java index f568ced53167..bda477d538d8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java @@ -18,15 +18,26 @@ package org.apache.beam.sdk.schemas.transforms.providers; import java.io.ByteArrayOutputStream; +import java.io.File; +import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.Method; import java.lang.reflect.Type; import java.net.URI; +import java.net.URL; +import java.net.URLClassLoader; import java.security.SecureClassLoader; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; +import java.util.function.Supplier; +import java.util.jar.Attributes; +import java.util.jar.Manifest; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import javax.tools.Diagnostic; import javax.tools.DiagnosticCollector; import javax.tools.FileObject; @@ -36,8 +47,38 @@ import javax.tools.SimpleJavaFileObject; import javax.tools.StandardJavaFileManager; import javax.tools.ToolProvider; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Suppliers; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; public class StringCompiler { + @SuppressWarnings({ + "nullness" // TODO(https://github.com/apache/beam/issues/20497) + }) + private static final Supplier classpathSupplier = + Suppliers.memoize( + () -> { + List cp = new ArrayList<>(); + cp.add(System.getProperty("java.class.path")); + // Javac doesn't properly handle manifest classpath spec. + ClassLoader cl = StringCompiler.class.getClassLoader(); + if (cl == null) cl = ClassLoader.getSystemClassLoader(); + if (cl instanceof URLClassLoader) { + for (URL url : ((URLClassLoader) cl).getURLs()) { + try { + ZipFile zipFile = new ZipFile(new File(url.getFile())); + ZipEntry manifestEntry = zipFile.getEntry("META-INF/MANIFEST.MF"); + if (manifestEntry != null) { + Manifest manifest = new Manifest(zipFile.getInputStream(manifestEntry)); + cp.add(manifest.getMainAttributes().getValue(Attributes.Name.CLASS_PATH)); + } + } catch (IOException exn) { + throw new RuntimeException(exn); + } + } + } + return String.join(System.getProperty("path.separator"), cp); + }); + public static class CompileException extends Exception { private final DiagnosticCollector diagnostics; @@ -63,7 +104,7 @@ public static Class getClass(String name, String source) null, fileManager, diagnostics, - null, + ImmutableList.of("-classpath", classpathSupplier.get()), null, Collections.singletonList(new InMemoryFileManager.InputJavaFileObject(name, source))); boolean result = task.call(); From 99d04253018cdec517d3eba211188b64776b18cb Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 28 Sep 2023 12:56:43 -0700 Subject: [PATCH 3/9] Add java map providers to standard providers listing. --- sdks/python/apache_beam/yaml/standard_providers.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index cdb4036f98c2..19640bb0fbec 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -24,3 +24,5 @@ version: BEAM_VERSION transforms: Sql: 'beam:external:java:sql:v1' + MapToFields-java: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1" + MapToFields-generic: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1" From 1e727ad0674824e74fa7e6c21b6300137587488f Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Thu, 28 Sep 2023 17:03:40 -0700 Subject: [PATCH 4/9] checkStyle --- .../beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java | 4 ++-- .../sdk/schemas/transforms/providers/StringCompilerTest.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java index 68cbbc8b1059..78ee36e7ca54 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdfTest.java @@ -17,7 +17,7 @@ */ package org.apache.beam.sdk.schemas.transforms.providers; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.net.MalformedURLException; import java.util.function.Function; @@ -27,7 +27,7 @@ public class JavaRowUdfTest { - public final Schema TEST_SCHEMA = + public static final Schema TEST_SCHEMA = Schema.of( Schema.Field.of("anInt32", Schema.FieldType.INT32).withNullable(true), Schema.Field.of("anInt64", Schema.FieldType.INT64).withNullable(true), diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java index 322bd276bb14..0c7bb4fa0538 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompilerTest.java @@ -17,7 +17,8 @@ */ package org.apache.beam.sdk.schemas.transforms.providers; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.util.function.Function; import org.apache.beam.sdk.values.Row; From 43fdc5726b1bafb9fc878b60ecb44fc351f02d15 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 29 Sep 2023 08:57:06 -0700 Subject: [PATCH 5/9] checkstyle --- .../providers/JavaMapToFieldsTransformProvider.java | 2 ++ .../beam/sdk/schemas/transforms/providers/StringCompiler.java | 4 +++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java index b8bdfc56a533..62bf16c130a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java @@ -96,6 +96,7 @@ public abstract static class Configuration { public abstract Map getFields(); @Nullable + @SuppressWarnings("all") public abstract ErrorHandling getError_handling(); public static Builder builder() { @@ -113,6 +114,7 @@ public abstract static class Builder { public abstract Builder setFields(Map fields); + @SuppressWarnings("all") public abstract Builder setError_handling(ErrorHandling error_handling); public abstract Configuration build(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java index bda477d538d8..5d7b2f327bb1 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java @@ -61,7 +61,9 @@ public class StringCompiler { cp.add(System.getProperty("java.class.path")); // Javac doesn't properly handle manifest classpath spec. ClassLoader cl = StringCompiler.class.getClassLoader(); - if (cl == null) cl = ClassLoader.getSystemClassLoader(); + if (cl == null) { + cl = ClassLoader.getSystemClassLoader(); + } if (cl instanceof URLClassLoader) { for (URL url : ((URLClassLoader) cl).getURLs()) { try { From f5aede2efe28d6213da9ff5e114bc2cbb8e0d513 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 29 Sep 2023 09:50:18 -0700 Subject: [PATCH 6/9] package info --- .../transforms/providers/package-info.java | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/package-info.java diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/package-info.java new file mode 100644 index 000000000000..6c5d1cb7c570 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Defines transforms that work on PCollections with schemas.. + * + *

For further details, see the documentation for each class in this package. + */ +@DefaultAnnotation(NonNull.class) +package org.apache.beam.sdk.schemas.transforms.providers; + +import edu.umd.cs.findbugs.annotations.DefaultAnnotation; +import org.checkerframework.checker.nullness.qual.NonNull; From 461db063e2ecac9cb886264625682904725bf345 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 6 Oct 2023 08:52:52 -0700 Subject: [PATCH 7/9] fix some warnings --- .../transforms/providers/JavaRowUdf.java | 6 ++- .../transforms/providers/StringCompiler.java | 47 +++++++++++-------- .../JavaMapToFieldsTransformProviderTest.java | 2 + ...ionServiceSchemaTransformProviderTest.java | 3 +- 4 files changed, 35 insertions(+), 23 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java index 8dbe0f21fd9f..d3538c6af91d 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import com.google.auto.value.AutoValue; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -57,7 +58,8 @@ public class JavaRowUdf implements Serializable { // Transient so we don't have to worry about issues serializing these dynamically created classes. // While this is lazily computed, it is always computed on class construction, so any errors - // should still be caught at construction time. + // should still be caught at construction time, and lazily re-computed before any use. + @SuppressFBWarnings("SE_TRANSIENT_FIELD_NOT_RESTORED") private transient Function function; // Find or implement the inverse of StaticSchemaInference.fieldFromType @@ -207,7 +209,7 @@ private static FunctionAndType createFunctionFromExpression(String expression, S for (Map.Entry fieldEntry : fieldTypes.entrySet()) { source.append( String.format( - " %s %s = (%s) __row__.getValue(%s);\n", + " %s %s = (%s) __row__.getValue(%s);%n", fieldEntry.getValue().getTypeName(), fieldEntry.getKey(), fieldEntry.getValue().getTypeName(), diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java index 5d7b2f327bb1..04730dce80c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/StringCompiler.java @@ -26,6 +26,8 @@ import java.net.URI; import java.net.URL; import java.net.URLClassLoader; +import java.security.AccessController; +import java.security.PrivilegedAction; import java.security.SecureClassLoader; import java.util.ArrayList; import java.util.Collections; @@ -66,15 +68,17 @@ public class StringCompiler { } if (cl instanceof URLClassLoader) { for (URL url : ((URLClassLoader) cl).getURLs()) { - try { - ZipFile zipFile = new ZipFile(new File(url.getFile())); - ZipEntry manifestEntry = zipFile.getEntry("META-INF/MANIFEST.MF"); - if (manifestEntry != null) { - Manifest manifest = new Manifest(zipFile.getInputStream(manifestEntry)); - cp.add(manifest.getMainAttributes().getValue(Attributes.Name.CLASS_PATH)); + File file = new File(url.getFile()); + if (file.exists() && !file.isDirectory()) { + try (ZipFile zipFile = new ZipFile(new File(url.getFile()))) { + ZipEntry manifestEntry = zipFile.getEntry("META-INF/MANIFEST.MF"); + if (manifestEntry != null) { + Manifest manifest = new Manifest(zipFile.getInputStream(manifestEntry)); + cp.add(manifest.getMainAttributes().getValue(Attributes.Name.CLASS_PATH)); + } + } catch (IOException exn) { + throw new RuntimeException(exn); } - } catch (IOException exn) { - throw new RuntimeException(exn); } } } @@ -200,18 +204,21 @@ public JavaFileObject getJavaFileForOutput( } public ClassLoader getClassLoader() { - return new SecureClassLoader() { - @Override - protected Class findClass(String name) throws ClassNotFoundException { - OutputJavaFileObject fileObject = outputFileObjects.get(name); - if (fileObject == null) { - throw new ClassNotFoundException(name); - } else { - byte[] classBytes = fileObject.getBytes(); - return defineClass(name, classBytes, 0, classBytes.length); - } - } - }; + return AccessController.doPrivileged( + (PrivilegedAction) + () -> + new SecureClassLoader() { + @Override + protected Class findClass(String name) throws ClassNotFoundException { + OutputJavaFileObject fileObject = outputFileObjects.get(name); + if (fileObject == null) { + throw new ClassNotFoundException(name); + } else { + byte[] classBytes = fileObject.getBytes(); + return defineClass(name, classBytes, 0, classBytes.length); + } + } + }); } @Override diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java index f90950407c20..7b3f311e7cd0 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java @@ -108,6 +108,7 @@ public void testAppendAndDropFields() { new JavaMapToFieldsTransformProvider() .from( JavaMapToFieldsTransformProvider.Configuration.builder() + .setLanguage("java") .setAppend(true) .setDrop(Collections.singletonList("b")) .setFields( @@ -151,6 +152,7 @@ public void testErrorHandling() { new JavaMapToFieldsTransformProvider() .from( JavaMapToFieldsTransformProvider.Configuration.builder() + .setLanguage("java") .setFields( ImmutableMap.of( "sqrt", diff --git a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java index 141d2b48b105..3b9de84fa8fc 100644 --- a/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java +++ b/sdks/java/expansion-service/src/test/java/org/apache/beam/sdk/expansion/service/ExpansionServiceSchemaTransformProviderTest.java @@ -19,6 +19,7 @@ import static org.apache.beam.runners.core.construction.BeamUrns.getUrn; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import com.google.auto.service.AutoService; import java.io.IOException; @@ -296,7 +297,7 @@ public void testSchemaTransformDiscovery() { ExpansionApi.DiscoverSchemaTransformRequest.newBuilder().build(); ExpansionApi.DiscoverSchemaTransformResponse response = expansionService.discover(discoverRequest); - assertEquals(2, response.getSchemaTransformConfigsCount()); + assertTrue(response.getSchemaTransformConfigsCount() >= 2); } private void verifyLeafTransforms(ExpansionApi.ExpansionResponse response, int count) { From 443b40041c4530bd0422fd30a63778cc567846b9 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Fri, 20 Oct 2023 16:58:00 -0700 Subject: [PATCH 8/9] fix naming conventions --- .../JavaMapToFieldsTransformProvider.java | 12 ++++----- .../JavaMapToFieldsTransformProviderTest.java | 2 +- .../apache_beam/yaml/standard_providers.yaml | 26 +++++++++++++++++++ 3 files changed, 32 insertions(+), 8 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java index 62bf16c130a3..ddf892f03fc2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProvider.java @@ -96,8 +96,7 @@ public abstract static class Configuration { public abstract Map getFields(); @Nullable - @SuppressWarnings("all") - public abstract ErrorHandling getError_handling(); + public abstract ErrorHandling getErrorHandling(); public static Builder builder() { return new AutoValue_JavaMapToFieldsTransformProvider_Configuration.Builder(); @@ -114,8 +113,7 @@ public abstract static class Builder { public abstract Builder setFields(Map fields); - @SuppressWarnings("all") - public abstract Builder setError_handling(ErrorHandling error_handling); + public abstract Builder setErrorHandling(ErrorHandling errorHandling); public abstract Configuration build(); } @@ -195,8 +193,8 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { } Schema outputSchema = outputSchemaBuilder.build(); boolean handleErrors = - configuration.getError_handling() != null - && configuration.getError_handling().getOutput() != null; + configuration.getErrorHandling() != null + && configuration.getErrorHandling().getOutput() != null; Schema errorSchema = Schema.of( Schema.Field.of("failed_row", Schema.FieldType.row(inputSchema)), @@ -215,7 +213,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) { PCollectionRowTuple result = PCollectionRowTuple.of(OUTPUT_ROWS_TAG, pcolls.get(mappedValues)); if (handleErrors) { - result = result.and(configuration.getError_handling().getOutput(), pcolls.get(errorValues)); + result = result.and(configuration.getErrorHandling().getOutput(), pcolls.get(errorValues)); } return result; } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java index 7b3f311e7cd0..64fc48564ccf 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/transforms/providers/JavaMapToFieldsTransformProviderTest.java @@ -171,7 +171,7 @@ public void testErrorHandling() { + " }" + "}") .build())) - .setError_handling( + .setErrorHandling( JavaMapToFieldsTransformProvider.Configuration.ErrorHandling .builder() .setOutput("errors") diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index 19640bb0fbec..01049569edec 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -26,3 +26,29 @@ Sql: 'beam:external:java:sql:v1' MapToFields-java: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1" MapToFields-generic: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1" + +- type: renaming + transforms: + 'MapToFields-java': 'MapToFields-java' + 'MapToFields-generic': 'MapToFields-generic' + config: + mappings: + 'MapToFields-generic': + language: 'language' + append: 'append' + drop: 'drop' + fields: 'fields' + error_handling: 'errorHandling' + 'MapToFields-java': + language: 'language' + append: 'append' + drop: 'drop' + fields: 'fields' + error_handling: 'errorHandling' + underlying_provider: + type: beamJar + transforms: + MapToFields-java: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1" + MapToFields-generic: "beam:schematransform:org.apache.beam:yaml:map_to_fields-java:v1" + config: + gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' From 5fd65321eb5cbeb947d04f523b0632ee475b1ab1 Mon Sep 17 00:00:00 2001 From: Robert Bradshaw Date: Tue, 31 Oct 2023 09:15:10 -0700 Subject: [PATCH 9/9] Allow fetching jars from non-local filesystems. --- .../transforms/providers/JavaRowUdf.java | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java index d3538c6af91d..e18df5b29a59 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/JavaRowUdf.java @@ -21,6 +21,8 @@ import com.google.auto.value.AutoValue; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.io.File; +import java.io.IOException; import java.io.Serializable; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -30,6 +32,9 @@ import java.net.MalformedURLException; import java.net.URL; import java.net.URLClassLoader; +import java.nio.channels.FileChannel; +import java.nio.channels.ReadableByteChannel; +import java.nio.file.StandardOpenOption; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -38,6 +43,7 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.schemas.AutoValueSchema; import org.apache.beam.sdk.schemas.FieldValueTypeInformation; import org.apache.beam.sdk.schemas.Schema; @@ -50,6 +56,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams; public class JavaRowUdf implements Serializable { private final Configuration config; @@ -240,6 +247,18 @@ private static FunctionAndType createFuctionFromCallable(String callable) }) private static FunctionAndType createFunctionFromName(String name, String path) throws ReflectiveOperationException, MalformedURLException { + if (path != null && !new File(path).exists()) { + try (ReadableByteChannel inChannel = + FileSystems.open(FileSystems.matchNewResource(path, false))) { + File tmpJar = File.createTempFile("map-to-fields-" + name, ".jar"); + try (FileChannel outChannel = FileChannel.open(tmpJar.toPath(), StandardOpenOption.WRITE)) { + ByteStreams.copy(inChannel, outChannel); + } + path = tmpJar.getPath(); + } catch (IOException exn) { + throw new RuntimeException(exn); + } + } ClassLoader classLoader = path == null ? ClassLoader.getSystemClassLoader()