From 54673996c9bf2ee076b04833bbae2729d6cebbaf Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud <65791736+ahmedabu98@users.noreply.github.com> Date: Mon, 8 Apr 2024 06:55:17 -0400 Subject: [PATCH] [Java] ManagedIO (#30808) * managed api for java * yaml utils --- build.gradle.kts | 1 + sdks/java/core/build.gradle | 1 + .../beam/sdk/schemas/utils/YamlUtils.java | 171 +++++++++++++ .../apache/beam/sdk/util/YamlUtilsTest.java | 228 ++++++++++++++++++ sdks/java/managed/build.gradle | 37 +++ .../org/apache/beam/sdk/managed/Managed.java | 195 +++++++++++++++ .../ManagedSchemaTransformProvider.java | 183 ++++++++++++++ .../apache/beam/sdk/managed/package-info.java | 20 ++ .../ManagedSchemaTransformProviderTest.java | 103 ++++++++ .../apache/beam/sdk/managed/ManagedTest.java | 114 +++++++++ .../managed/TestSchemaTransformProvider.java | 98 ++++++++ .../src/test/resources/test_config.yaml | 21 ++ settings.gradle.kts | 2 + 13 files changed, 1174 insertions(+) create mode 100644 sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java create mode 100644 sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java create mode 100644 sdks/java/managed/build.gradle create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java create mode 100644 sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java create mode 100644 sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java create mode 100644 sdks/java/managed/src/test/resources/test_config.yaml diff --git a/build.gradle.kts b/build.gradle.kts index ded692677b53..9c42ffdc8cea 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -303,6 +303,7 @@ tasks.register("javaPreCommit") { dependsOn(":sdks:java:io:synthetic:build") dependsOn(":sdks:java:io:xml:build") dependsOn(":sdks:java:javadoc:allJavadoc") + dependsOn(":sdks:java:managed:build") dependsOn(":sdks:java:testing:expansion-service:build") dependsOn(":sdks:java:testing:jpms-tests:build") dependsOn(":sdks:java:testing:load-tests:build") diff --git a/sdks/java/core/build.gradle b/sdks/java/core/build.gradle index 438a3fb1806c..5a47cb5237ea 100644 --- a/sdks/java/core/build.gradle +++ b/sdks/java/core/build.gradle @@ -98,6 +98,7 @@ dependencies { permitUnusedDeclared enforcedPlatform(library.java.google_cloud_platform_libraries_bom) provided library.java.json_org implementation library.java.everit_json_schema + implementation "org.yaml:snakeyaml:2.0" shadowTest library.java.everit_json_schema provided library.java.junit testImplementation "com.github.stefanbirkner:system-rules:1.19.0" diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java new file mode 100644 index 000000000000..5c05b2bed396 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/YamlUtils.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.schemas.utils; + +import static org.apache.beam.sdk.values.Row.toRow; + +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.function.Function; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.schemas.Schema.FieldType; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.CaseFormat; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.yaml.snakeyaml.Yaml; + +public class YamlUtils { + private static final Map> YAML_VALUE_PARSERS = + ImmutableMap + .> + builder() + .put(Schema.TypeName.BYTE, Byte::valueOf) + .put(Schema.TypeName.INT16, Short::valueOf) + .put(Schema.TypeName.INT32, Integer::valueOf) + .put(Schema.TypeName.INT64, Long::valueOf) + .put(Schema.TypeName.FLOAT, Float::valueOf) + .put(Schema.TypeName.DOUBLE, Double::valueOf) + .put(Schema.TypeName.DECIMAL, BigDecimal::new) + .put(Schema.TypeName.BOOLEAN, Boolean::valueOf) + .put(Schema.TypeName.STRING, str -> str) + .put(Schema.TypeName.BYTES, str -> BaseEncoding.base64().decode(str)) + .build(); + + public static Row toBeamRow(@Nullable String yamlString, Schema schema) { + return toBeamRow(yamlString, schema, false); + } + + public static Row toBeamRow( + @Nullable String yamlString, Schema schema, boolean convertNamesToCamelCase) { + if (yamlString == null || yamlString.isEmpty()) { + List requiredFields = + schema.getFields().stream() + .filter(field -> !field.getType().getNullable()) + .collect(Collectors.toList()); + if (requiredFields.isEmpty()) { + return Row.nullRow(schema); + } else { + throw new IllegalArgumentException( + String.format( + "Received an empty YAML string, but output schema contains required fields: %s", + requiredFields)); + } + } + Yaml yaml = new Yaml(); + Object yamlMap = yaml.load(yamlString); + + Preconditions.checkArgument( + yamlMap instanceof Map, + "Expected a YAML mapping but got type '%s' instead.", + Preconditions.checkNotNull(yamlMap).getClass()); + + return toBeamRow( + (Map) Preconditions.checkNotNull(yamlMap), schema, convertNamesToCamelCase); + } + + private static @Nullable Object toBeamValue( + Field field, @Nullable Object yamlValue, boolean convertNamesToCamelCase) { + FieldType fieldType = field.getType(); + + if (yamlValue == null) { + if (fieldType.getNullable()) { + return null; + } else { + throw new IllegalArgumentException( + "Received null value for non-nullable field \"" + field.getName() + "\""); + } + } + + if (yamlValue instanceof String + || yamlValue instanceof Number + || yamlValue instanceof Boolean) { + String yamlStringValue = yamlValue.toString(); + if (YAML_VALUE_PARSERS.containsKey(fieldType.getTypeName())) { + return YAML_VALUE_PARSERS.get(fieldType.getTypeName()).apply(yamlStringValue); + } + } + + if (yamlValue instanceof byte[] && fieldType.getTypeName() == Schema.TypeName.BYTES) { + return yamlValue; + } + + if (yamlValue instanceof List) { + FieldType innerType = + Preconditions.checkNotNull( + fieldType.getCollectionElementType(), + "Cannot convert YAML type '%s` to `%s` because the YAML value is a List, but the output schema field does not define a collection type.", + yamlValue.getClass(), + fieldType); + return ((List) yamlValue) + .stream() + .map( + v -> + Preconditions.checkNotNull( + toBeamValue(field.withType(innerType), v, convertNamesToCamelCase))) + .collect(Collectors.toList()); + } + + if (yamlValue instanceof Map) { + if (fieldType.getTypeName() == Schema.TypeName.ROW) { + Schema nestedSchema = + Preconditions.checkNotNull( + fieldType.getRowSchema(), + "Received a YAML '%s' type, but output schema field '%s' does not define a Row Schema", + yamlValue.getClass(), + fieldType); + return toBeamRow((Map) yamlValue, nestedSchema, convertNamesToCamelCase); + } else if (fieldType.getTypeName() == Schema.TypeName.MAP) { + return yamlValue; + } + } + + throw new UnsupportedOperationException( + String.format( + "Converting YAML type '%s' to '%s' is not supported", yamlValue.getClass(), fieldType)); + } + + @SuppressWarnings("nullness") + public static Row toBeamRow(Map yamlMap, Schema rowSchema, boolean toCamelCase) { + return rowSchema.getFields().stream() + .map( + field -> + toBeamValue( + field, + yamlMap.get(maybeGetSnakeCase(field.getName(), toCamelCase)), + toCamelCase)) + .collect(toRow(rowSchema)); + } + + private static String maybeGetSnakeCase(String str, boolean getSnakeCase) { + return getSnakeCase ? CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, str) : str; + } + + public static String yamlStringFromMap(@Nullable Map map) { + if (map == null || map.isEmpty()) { + return ""; + } + return new Yaml().dumpAsMap(map); + } +} diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java new file mode 100644 index 000000000000..6e6984dde3a6 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/YamlUtilsTest.java @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.util; + +import static org.junit.Assert.assertEquals; + +import java.math.BigDecimal; +import java.util.Arrays; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.BaseEncoding; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class YamlUtilsTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + public String makeNested(String input) { + return Arrays.stream(input.split("\n")) + .map(str -> " " + str) + .collect(Collectors.joining("\n")); + } + + @Test + public void testEmptyYamlString() { + Schema schema = Schema.builder().build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow("", schema)); + } + + @Test + public void testInvalidEmptyYamlWithNonEmptySchema() { + Schema schema = Schema.builder().addStringField("dummy").build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage( + "Received an empty YAML string, but output schema contains required fields"); + thrown.expectMessage("dummy"); + + YamlUtils.toBeamRow("", schema); + } + + @Test + public void testNullableValues() { + String yamlString = "nullable_string:\n" + "nullable_integer:\n" + "nullable_boolean:\n"; + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testMissingNullableValues() { + String yamlString = "nullable_string:"; + Schema schema = + Schema.builder() + .addNullableStringField("nullable_string") + .addNullableInt32Field("nullable_integer") + .addNullableBooleanField("nullable_boolean") + .build(); + + assertEquals(Row.nullRow(schema), YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testInvalidNullableValues() { + String yamlString = "nullable_string:\n" + "integer:"; + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received null value for non-nullable field \"integer\""); + YamlUtils.toBeamRow(yamlString, schema); + } + + @Test + public void testInvalidMissingRequiredValues() { + String yamlString = "nullable_string:"; + Schema schema = + Schema.builder().addNullableStringField("nullable_string").addInt32Field("integer").build(); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Received null value for non-nullable field \"integer\""); + + YamlUtils.toBeamRow(yamlString, schema); + } + + @Test + public void testExtraFieldsAreIgnored() { + String yamlString = "field1: val1\n" + "field2: val2"; + Schema schema = Schema.builder().addStringField("field1").build(); + Row expectedRow = Row.withSchema(schema).withFieldValue("field1", "val1").build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); + } + + @Test + public void testInvalidTopLevelArray() { + String invalidYaml = "- top_level_list" + "- another_list"; + Schema schema = Schema.builder().build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Expected a YAML mapping"); + YamlUtils.toBeamRow(invalidYaml, schema); + } + + private static final Schema FLAT_SCHEMA = + Schema.builder() + .addByteField("byte_field") + .addInt16Field("int16_field") + .addInt32Field("int32_field") + .addInt64Field("int64_field") + .addFloatField("float_field") + .addDoubleField("double_field") + .addDecimalField("decimal_field") + .addBooleanField("boolean_field") + .addStringField("string_field") + .addByteArrayField("bytes_field") + .build(); + + private static final Row FLAT_ROW = + Row.withSchema(FLAT_SCHEMA) + .withFieldValue("byte_field", Byte.valueOf("123")) + .withFieldValue("int16_field", Short.valueOf("16")) + .withFieldValue("int32_field", 32) + .withFieldValue("int64_field", 64L) + .withFieldValue("float_field", 123.456F) + .withFieldValue("double_field", 456.789) + .withFieldValue("decimal_field", BigDecimal.valueOf(789.123)) + .withFieldValue("boolean_field", true) + .withFieldValue("string_field", "some string") + .withFieldValue("bytes_field", BaseEncoding.base64().decode("abc")) + .build(); + + private static final String FLAT_YAML = + "byte_field: 123\n" + + "int16_field: 16\n" + + "int32_field: 32\n" + + "int64_field: 64\n" + + "float_field: 123.456\n" + + "double_field: 456.789\n" + + "decimal_field: 789.123\n" + + "boolean_field: true\n" + + "string_field: some string\n" + + "bytes_field: abc"; + + @Test + public void testAllTypesFlat() { + assertEquals(FLAT_ROW, YamlUtils.toBeamRow(FLAT_YAML, FLAT_SCHEMA)); + } + + @Test + public void testAllTypesNested() { + String nestedFlatTypes = makeNested(FLAT_YAML); + String topLevelYaml = "top_string: abc\n" + "nested: \n" + nestedFlatTypes; + + Schema schema = + Schema.builder().addStringField("top_string").addRowField("nested", FLAT_SCHEMA).build(); + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("top_string", "abc") + .withFieldValue("nested", FLAT_ROW) + .build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(topLevelYaml, schema)); + } + + private static final String INT_ARRAY_YAML = + "arr:\n" + " - 1\n" + " - 2\n" + " - 3\n" + " - 4\n" + " - 5\n"; + + private static final Schema INT_ARRAY_SCHEMA = + Schema.builder().addArrayField("arr", Schema.FieldType.INT32).build(); + + private static final Row INT_ARRAY_ROW = + Row.withSchema(INT_ARRAY_SCHEMA) + .withFieldValue("arr", IntStream.range(1, 6).boxed().collect(Collectors.toList())) + .build(); + + @Test + public void testArray() { + assertEquals(INT_ARRAY_ROW, YamlUtils.toBeamRow(INT_ARRAY_YAML, INT_ARRAY_SCHEMA)); + } + + @Test + public void testNestedArray() { + String nestedArray = makeNested(INT_ARRAY_YAML); + String yamlString = "str_field: some string\n" + "nested: \n" + nestedArray; + + Schema schema = + Schema.builder() + .addStringField("str_field") + .addRowField("nested", INT_ARRAY_SCHEMA) + .build(); + + Row expectedRow = + Row.withSchema(schema) + .withFieldValue("str_field", "some string") + .withFieldValue("nested", INT_ARRAY_ROW) + .build(); + + assertEquals(expectedRow, YamlUtils.toBeamRow(yamlString, schema)); + } +} diff --git a/sdks/java/managed/build.gradle b/sdks/java/managed/build.gradle new file mode 100644 index 000000000000..88e537d66f8c --- /dev/null +++ b/sdks/java/managed/build.gradle @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +apply plugin: 'org.apache.beam.module' +applyJavaNature( + automaticModuleName: 'org.apache.beam.sdk.managed', +) + + +description = "Apache Beam :: SDKs :: Java :: Managed" +ext.summary = """Library that provides managed IOs.""" + + +dependencies { + implementation project(path: ":sdks:java:core", configuration: "shadow") + implementation library.java.vendored_guava_32_1_2_jre +// implementation library.java.vendored_grpc_1_60_1 + + testImplementation library.java.junit + testRuntimeOnly "org.yaml:snakeyaml:2.0" + testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java new file mode 100644 index 000000000000..b2b010b1e434 --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/Managed.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import com.google.auto.value.AutoValue; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; + +/** + * Top-level {@link org.apache.beam.sdk.transforms.PTransform}s that build and instantiate turnkey + * transforms. + * + *

Available transforms

+ * + *

This API currently supports two operations: {@link Read} and {@link Write}. Each one + * enumerates the available transforms in a {@code TRANSFORMS} map. + * + *

Building a Managed turnkey transform

+ * + *

Turnkey transforms are represented as {@link SchemaTransform}s, which means each one has a + * defined configuration. A given transform can be built with a {@code Map} that + * specifies arguments using like so: + * + *

{@code
+ * PCollectionRowTuple output = PCollectionRowTuple.empty(pipeline).apply(
+ *       Managed.read(ICEBERG)
+ *           .withConfig(ImmutableMap..builder()
+ *               .put("foo", "abc")
+ *               .put("bar", 123)
+ *               .build()));
+ * }
+ * + *

Instead of specifying configuration arguments directly in the code, one can provide the + * location to a YAML file that contains this information. Say we have the following YAML file: + * + *

{@code
+ * foo: "abc"
+ * bar: 123
+ * }
+ * + *

The file's path can be passed in to the Managed API like so: + * + *

{@code
+ * PCollectionRowTuple input = PCollectionRowTuple.of("input", pipeline.apply(Create.of(...)))
+ *
+ * PCollectionRowTuple output = input.apply(
+ *     Managed.write(ICEBERG)
+ *         .withConfigUrl());
+ * }
+ */ +public class Managed { + + // TODO: Dynamically generate a list of supported transforms + public static final String ICEBERG = "iceberg"; + + public static final Map READ_TRANSFORMS = + ImmutableMap.builder() + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_read:v1") + .build(); + public static final Map WRITE_TRANSFORMS = + ImmutableMap.builder() + .put(ICEBERG, "beam:schematransform:org.apache.beam:iceberg_write:v1") + .build(); + + /** + * Instantiates a {@link Managed.Read} transform for the specified source. The supported managed + * sources are: + * + *
    + *
  • {@link Managed#ICEBERG} : Read from Apache Iceberg + *
+ */ + public static ManagedTransform read(String source) { + + return new AutoValue_Managed_ManagedTransform.Builder() + .setIdentifier( + Preconditions.checkNotNull( + READ_TRANSFORMS.get(source.toLowerCase()), + "An unsupported source was specified: '%s'. Please specify one of the following sources: %s", + source, + READ_TRANSFORMS.keySet())) + .setSupportedIdentifiers(new ArrayList<>(READ_TRANSFORMS.values())) + .build(); + } + + /** + * Instantiates a {@link Managed.Write} transform for the specified sink. The supported managed + * sinks are: + * + *
    + *
  • {@link Managed#ICEBERG} : Write to Apache Iceberg + *
+ */ + public static ManagedTransform write(String sink) { + return new AutoValue_Managed_ManagedTransform.Builder() + .setIdentifier( + Preconditions.checkNotNull( + WRITE_TRANSFORMS.get(sink.toLowerCase()), + "An unsupported sink was specified: '%s'. Please specify one of the following sinks: %s", + sink, + WRITE_TRANSFORMS.keySet())) + .setSupportedIdentifiers(new ArrayList<>(WRITE_TRANSFORMS.values())) + .build(); + } + + @AutoValue + public abstract static class ManagedTransform extends SchemaTransform { + abstract String getIdentifier(); + + abstract @Nullable String getConfig(); + + abstract @Nullable String getConfigUrl(); + + @VisibleForTesting + abstract List getSupportedIdentifiers(); + + abstract Builder toBuilder(); + + @AutoValue.Builder + abstract static class Builder { + abstract Builder setIdentifier(String identifier); + + abstract Builder setConfig(@Nullable String config); + + abstract Builder setConfigUrl(@Nullable String configUrl); + + @VisibleForTesting + abstract Builder setSupportedIdentifiers(List supportedIdentifiers); + + abstract ManagedTransform build(); + } + + /** + * Use the input Map of configuration arguments to build and instantiate the underlying + * transform. The map can ignore nullable parameters, but needs to include all required + * parameters. Check the underlying transform's schema ({@link + * SchemaTransformProvider#configurationSchema()}) to see which parameters are available. + */ + public ManagedTransform withConfig(Map config) { + return toBuilder().setConfig(YamlUtils.yamlStringFromMap(config)).build(); + } + + /** + * Like {@link #withConfig(Map)}, but instead extracts the configuration arguments from a + * specified YAML file location. + */ + public ManagedTransform withConfigUrl(String configUrl) { + return toBuilder().setConfigUrl(configUrl).build(); + } + + @VisibleForTesting + ManagedTransform withSupportedIdentifiers(List supportedIdentifiers) { + return toBuilder().setSupportedIdentifiers(supportedIdentifiers).build(); + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + ManagedSchemaTransformProvider.ManagedConfig managedConfig = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setTransformIdentifier(getIdentifier()) + .setConfig(getConfig()) + .setConfigUrl(getConfigUrl()) + .build(); + + SchemaTransform underlyingTransform = + new ManagedSchemaTransformProvider(getSupportedIdentifiers()).from(managedConfig); + + return input.apply(underlyingTransform); + } + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java new file mode 100644 index 000000000000..1ee2b11a90ff --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProvider.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.ServiceLoader; +import javax.annotation.Nullable; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.schemas.utils.YamlUtils; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; + +@AutoService(SchemaTransformProvider.class) +public class ManagedSchemaTransformProvider + extends TypedSchemaTransformProvider { + + @Override + public String identifier() { + return "beam:schematransform:org.apache.beam:managed:v1"; + } + + private final Map schemaTransformProviders = new HashMap<>(); + + public ManagedSchemaTransformProvider() {} + + ManagedSchemaTransformProvider(Collection supportedIdentifiers) { + try { + for (SchemaTransformProvider schemaTransformProvider : + ServiceLoader.load(SchemaTransformProvider.class)) { + if (schemaTransformProviders.containsKey(schemaTransformProvider.identifier())) { + throw new IllegalArgumentException( + "Found multiple SchemaTransformProvider implementations with the same identifier " + + schemaTransformProvider.identifier()); + } + schemaTransformProviders.put(schemaTransformProvider.identifier(), schemaTransformProvider); + } + } catch (Exception e) { + throw new RuntimeException(e.getMessage()); + } + + schemaTransformProviders.entrySet().removeIf(e -> !supportedIdentifiers.contains(e.getKey())); + } + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + @VisibleForTesting + abstract static class ManagedConfig { + public static Builder builder() { + return new AutoValue_ManagedSchemaTransformProvider_ManagedConfig.Builder(); + } + + @SchemaFieldDescription("Identifier of the underlying IO to instantiate.") + public abstract String getTransformIdentifier(); + + @SchemaFieldDescription("URL path to the YAML config file used to build the underlying IO.") + public abstract @Nullable String getConfigUrl(); + + @SchemaFieldDescription("YAML string config used to build the underlying IO.") + public abstract @Nullable String getConfig(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setTransformIdentifier(String identifier); + + public abstract Builder setConfigUrl(@Nullable String configUrl); + + public abstract Builder setConfig(@Nullable String config); + + public abstract ManagedConfig build(); + } + + protected void validate() { + boolean configExists = !Strings.isNullOrEmpty(getConfig()); + boolean configUrlExists = !Strings.isNullOrEmpty(getConfigUrl()); + checkArgument( + !(configExists && configUrlExists) && (configExists || configUrlExists), + "Please specify a config or a config URL, but not both."); + } + } + + @Override + protected SchemaTransform from(ManagedConfig managedConfig) { + managedConfig.validate(); + SchemaTransformProvider schemaTransformProvider = + Preconditions.checkNotNull( + schemaTransformProviders.get(managedConfig.getTransformIdentifier()), + "Could not find transform with identifier %s, or it may not be supported", + managedConfig.getTransformIdentifier()); + + // parse config before expansion to check if it matches underlying transform's config schema + Schema transformConfigSchema = schemaTransformProvider.configurationSchema(); + Row transformConfig; + try { + transformConfig = getRowConfig(managedConfig, transformConfigSchema); + } catch (Exception e) { + throw new IllegalArgumentException( + String.format( + "Specified configuration does not align with the underlying transform's configuration schema [%s].", + transformConfigSchema), + e); + } + + return new ManagedSchemaTransform(transformConfig, schemaTransformProvider); + } + + private static class ManagedSchemaTransform extends SchemaTransform { + private final Row transformConfig; + private final SchemaTransformProvider underlyingTransformProvider; + + ManagedSchemaTransform( + Row transformConfig, SchemaTransformProvider underlyingTransformProvider) { + this.transformConfig = transformConfig; + this.underlyingTransformProvider = underlyingTransformProvider; + } + + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + SchemaTransform underlyingTransform = underlyingTransformProvider.from(transformConfig); + + return input.apply(underlyingTransform); + } + } + + @VisibleForTesting + static Row getRowConfig(ManagedConfig config, Schema transformSchema) { + String transformYamlConfig; + if (!Strings.isNullOrEmpty(config.getConfigUrl())) { + try { + MatchResult.Metadata fileMetaData = + FileSystems.matchSingleFileSpec(Preconditions.checkNotNull(config.getConfigUrl())); + ByteBuffer buffer = ByteBuffer.allocate((int) fileMetaData.sizeBytes()); + FileSystems.open(fileMetaData.resourceId()).read(buffer); + transformYamlConfig = new String(buffer.array(), StandardCharsets.UTF_8); + } catch (IOException e) { + throw new RuntimeException(e); + } + } else { + transformYamlConfig = config.getConfig(); + } + + return YamlUtils.toBeamRow(transformYamlConfig, transformSchema, true); + } + + @VisibleForTesting + Map getAllProviders() { + return schemaTransformProviders; + } +} diff --git a/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java new file mode 100644 index 000000000000..d129e4a7a225 --- /dev/null +++ b/sdks/java/managed/src/main/java/org/apache/beam/sdk/managed/package-info.java @@ -0,0 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Managed reads and writes. */ +package org.apache.beam.sdk.managed; diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java new file mode 100644 index 000000000000..0c495d0d2c5c --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedSchemaTransformProviderTest.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import static org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.ManagedConfig; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URISyntaxException; +import java.nio.file.Paths; +import java.util.Arrays; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ManagedSchemaTransformProviderTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testFailWhenNoConfigSpecified() { + ManagedSchemaTransformProvider.ManagedConfig config = + ManagedSchemaTransformProvider.ManagedConfig.builder() + .setTransformIdentifier("some identifier") + .build(); + + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Please specify a config or a config URL, but not both"); + config.validate(); + } + + @Test + public void testGetRowFromYamlConfig() { + String yamlString = "extra_string: abc\n" + "extra_integer: 123"; + ManagedConfig config = + ManagedConfig.builder() + .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .setConfig(yamlString) + .build(); + Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); + Row expectedRow = + Row.withSchema(configSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); + Row configRow = + ManagedSchemaTransformProvider.getRowConfig( + config, new TestSchemaTransformProvider().configurationSchema()); + + assertEquals(expectedRow, configRow); + } + + @Test + public void testGetRowFromConfigUrl() throws URISyntaxException { + String yamlConfigPath = + Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) + .toFile() + .getAbsolutePath(); + ManagedConfig config = + ManagedConfig.builder() + .setTransformIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .setConfigUrl(yamlConfigPath) + .build(); + Schema configSchema = new TestSchemaTransformProvider().configurationSchema(); + Row expectedRow = + Row.withSchema(configSchema) + .withFieldValue("extraString", "abc") + .withFieldValue("extraInteger", 123) + .build(); + Row configRow = + ManagedSchemaTransformProvider.getRowConfig( + config, new TestSchemaTransformProvider().configurationSchema()); + + assertEquals(expectedRow, configRow); + } + + @Test + public void testDiscoverTestProvider() { + ManagedSchemaTransformProvider provider = + new ManagedSchemaTransformProvider(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)); + + assertTrue(provider.getAllProviders().containsKey(TestSchemaTransformProvider.IDENTIFIER)); + } +} diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java new file mode 100644 index 000000000000..ceb71a06f33c --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/ManagedTest.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import java.nio.file.Paths; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ManagedTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + public void testInvalidTransform() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("An unsupported source was specified"); + Managed.read("nonexistent-source"); + + thrown.expect(NullPointerException.class); + thrown.expectMessage("An unsupported sink was specified"); + Managed.write("nonexistent-sink"); + } + + @Rule public TestPipeline pipeline = TestPipeline.create(); + + private static final Schema SCHEMA = + Schema.builder().addStringField("str").addInt32Field("int").build(); + private static final List ROWS = + Arrays.asList( + Row.withSchema(SCHEMA).withFieldValue("str", "a").withFieldValue("int", 1).build(), + Row.withSchema(SCHEMA).withFieldValue("str", "b").withFieldValue("int", 2).build(), + Row.withSchema(SCHEMA).withFieldValue("str", "c").withFieldValue("int", 3).build()); + + public void runTestProviderTest(Managed.ManagedTransform writeOp) { + PCollection rows = + PCollectionRowTuple.of("input", pipeline.apply(Create.of(ROWS)).setRowSchema(SCHEMA)) + .apply(writeOp) + .get("output"); + + Schema outputSchema = rows.getSchema(); + PAssert.that(rows) + .containsInAnyOrder( + ROWS.stream() + .map( + row -> + Row.withSchema(outputSchema) + .addValues(row.getValues()) + .addValue("abc") + .addValue(123) + .build()) + .collect(Collectors.toList())); + pipeline.run(); + } + + @Test + public void testManagedTestProviderWithConfigMap() { + Managed.ManagedTransform writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfig(ImmutableMap.of("extra_string", "abc", "extra_integer", 123)); + + runTestProviderTest(writeOp); + } + + @Test + public void testManagedTestProviderWithConfigFile() throws Exception { + String yamlConfigPath = + Paths.get(getClass().getClassLoader().getResource("test_config.yaml").toURI()) + .toFile() + .getAbsolutePath(); + + Managed.ManagedTransform writeOp = + Managed.write(Managed.ICEBERG) + .toBuilder() + .setIdentifier(TestSchemaTransformProvider.IDENTIFIER) + .build() + .withSupportedIdentifiers(Arrays.asList(TestSchemaTransformProvider.IDENTIFIER)) + .withConfigUrl(yamlConfigPath); + + runTestProviderTest(writeOp); + } +} diff --git a/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java new file mode 100644 index 000000000000..136d98d468d0 --- /dev/null +++ b/sdks/java/managed/src/test/java/org/apache/beam/sdk/managed/TestSchemaTransformProvider.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.managed; + +import com.google.auto.service.AutoService; +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.schemas.AutoValueSchema; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.annotations.DefaultSchema; +import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription; +import org.apache.beam.sdk.schemas.transforms.SchemaTransform; +import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider; +import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionRowTuple; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptors; + +@AutoService(SchemaTransformProvider.class) +public class TestSchemaTransformProvider + extends TypedSchemaTransformProvider { + static final String IDENTIFIER = "beam:schematransform:org.apache.beam:test_transform:v1"; + + @DefaultSchema(AutoValueSchema.class) + @AutoValue + public abstract static class Config { + public static Builder builder() { + return new AutoValue_TestSchemaTransformProvider_Config.Builder(); + } + + @SchemaFieldDescription("String to add to each row element.") + public abstract String getExtraString(); + + @SchemaFieldDescription("Integer to add to each row element.") + public abstract Integer getExtraInteger(); + + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setExtraString(String extraString); + + public abstract Builder setExtraInteger(Integer extraInteger); + + public abstract Config build(); + } + } + + @Override + public SchemaTransform from(Config config) { + String extraString = config.getExtraString(); + Integer extraInteger = config.getExtraInteger(); + return new SchemaTransform() { + @Override + public PCollectionRowTuple expand(PCollectionRowTuple input) { + Schema schema = + Schema.builder() + .addFields(input.get("input").getSchema().getFields()) + .addStringField("extra_string") + .addInt32Field("extra_integer") + .build(); + PCollection rows = + input + .get("input") + .apply( + MapElements.into(TypeDescriptors.rows()) + .via( + row -> + Row.withSchema(schema) + .addValues(row.getValues()) + .addValue(extraString) + .addValue(extraInteger) + .build())) + .setRowSchema(schema); + return PCollectionRowTuple.of("output", rows); + } + }; + } + + @Override + public String identifier() { + return IDENTIFIER; + } +} diff --git a/sdks/java/managed/src/test/resources/test_config.yaml b/sdks/java/managed/src/test/resources/test_config.yaml new file mode 100644 index 000000000000..7725c32b348e --- /dev/null +++ b/sdks/java/managed/src/test/resources/test_config.yaml @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +extra_string: "abc" +extra_integer: 123 \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index ec11fd32fdd3..1e52e425b215 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -353,3 +353,5 @@ include("sdks:java:io:kafka:kafka-100") findProject(":sdks:java:io:kafka:kafka-100")?.name = "kafka-100" include("sdks:java:io:kafka:kafka-01103") findProject(":sdks:java:io:kafka:kafka-01103")?.name = "kafka-01103" +include("sdks:java:managed") +findProject(":sdks:java:managed")?.name = "managed"