diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java
new file mode 100644
index 000000000000..1f095522dd8b
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowStringInterpolator.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.Row;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Instant;
+
+/**
+ * A utility that interpolates values in a pre-determined {@link String} using an input Beam {@link
+ * Row}.
+ *
+ *
The {@link RowStringInterpolator} looks for field names specified inside {curly braces}. For
+ * example, if the interpolator is configured with the String {@code "unified {foo} and streaming"},
+ * it will look for a field name {@code "foo"} in the input {@link Row} and substitute in that
+ * value. If a {@link RowStringInterpolator} is configured with a template String that contains no
+ * placeholders (i.e. no curly braces), it will simply return that String, untouched.
+ *
+ *
Nested fields can be specified using dot-notation (e.g. {@code "top.middle.nested"}).
+ *
+ *
Configure a {@link RowStringInterpolator} like so:
+ *
+ *
{@code
+ * String template = "unified {foo} and {bar.baz}!";
+ * Row inputRow = {foo: "batch", bar: {baz: "streaming"}, ...};
+ *
+ * RowStringInterpolator interpolator = new RowStringInterpolator(template, beamSchema);
+ * String output = interpolator.interpolate(inputRow, window, paneInfo, timestamp);
+ * // output --> "unified batch and streaming!"
+ * }
+ *
+ * Additionally, {@link #interpolate(Row, BoundedWindow, PaneInfo, Instant)} can be used in
+ * streaming scenarios to substitute windowing metadata into the template String. To make use of
+ * this, use the relevant placeholder:
+ *
+ *
+ * - $WINDOW: the window's string representation
+ *
- $PANE_INDEX: the pane's index
+ *
- $YYYY: the element timestamp's year
+ *
- $MM: the element timestamp's month
+ *
- $DD: the element timestamp's day
+ *
+ *
+ * For example, your String template can look like:
+ *
+ *
{@code "unified {foo} and {bar} since {$YYYY}-{$MM}!"}
+ */
+public class RowStringInterpolator implements Serializable {
+ private final String template;
+ private final Set fieldsToReplace;
+ // Represents the string representation of the element's window
+ public static final String WINDOW = "$WINDOW";
+ public static final String PANE_INDEX = "$PANE_INDEX";
+ // Represents the element's pane index
+ public static final String YYYY = "$YYYY";
+ public static final String MM = "$MM";
+ public static final String DD = "$DD";
+ private static final Set WINDOWING_METADATA =
+ Sets.newHashSet(WINDOW, PANE_INDEX, YYYY, MM, DD);
+ private static final Pattern TEMPLATE_PATTERN = Pattern.compile("\\{(.+?)}");
+
+ /**
+ * @param template a String template, potentially with placeholders in the form of curly braces,
+ * e.g. {@code "my {foo} template"}. During interpolation, these placeholders are replaced
+ * with values in the Beam Row. For more details and examples, refer to the top-level
+ * documentation.
+ * @param rowSchema {@link Row}s used for interpolation are expected to be compatible with this
+ * {@link Schema}.
+ */
+ public RowStringInterpolator(String template, Schema rowSchema) {
+ this.template = template;
+
+ Matcher m = TEMPLATE_PATTERN.matcher(template);
+ fieldsToReplace = new HashSet<>();
+ while (m.find()) {
+ fieldsToReplace.add(checkStateNotNull(m.group(1)));
+ }
+
+ List rowFields =
+ fieldsToReplace.stream()
+ .filter(f -> !WINDOWING_METADATA.contains(f))
+ .collect(Collectors.toList());
+
+ RowFilter.validateSchemaContainsFields(rowSchema, rowFields, "string interpolation");
+ }
+
+ /**
+ * Performs string interpolation on the template using values from the input {@link Row} and its
+ * windowing metadata.
+ */
+ public String interpolate(Row row, BoundedWindow window, PaneInfo paneInfo, Instant timestamp) {
+ String interpolated = this.template;
+ for (String field : fieldsToReplace) {
+ Object val;
+ switch (field) {
+ case WINDOW:
+ val = window.toString();
+ break;
+ case PANE_INDEX:
+ val = paneInfo.getIndex();
+ break;
+ case YYYY:
+ val = timestamp.getChronology().year().get(timestamp.getMillis());
+ break;
+ case MM:
+ val = timestamp.getChronology().monthOfYear().get(timestamp.getMillis());
+ break;
+ case DD:
+ val = timestamp.getChronology().dayOfMonth().get(timestamp.getMillis());
+ break;
+ default:
+ val = MoreObjects.firstNonNull(getValue(row, field), "");
+ break;
+ }
+
+ interpolated = interpolated.replace("{" + field + "}", String.valueOf(val));
+ }
+ return interpolated;
+ }
+
+ private @Nullable Object getValue(@Nullable Row row, String fieldPath) {
+ if (row == null) {
+ return null;
+ }
+ int dotIndex = fieldPath.indexOf('.');
+ String field = dotIndex == -1 ? fieldPath : fieldPath.substring(0, dotIndex);
+ Preconditions.checkArgument(
+ row.getSchema().hasField(field), "Invalid row does not contain field '%s'.", field);
+
+ if (dotIndex == -1) {
+ return row.getValue(field);
+ }
+ return getValue(row.getRow(field), fieldPath.substring(dotIndex + 1));
+ }
+}
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java
new file mode 100644
index 000000000000..0b1295c38533
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/RowStringInterpolatorTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.util;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Test class for {@link RowStringInterpolator}. */
+public class RowStringInterpolatorTest {
+ @Rule public ExpectedException thrown = ExpectedException.none();
+
+ private static final Schema DOUBLY_NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("doubly_nested_str")
+ .addInt32Field("doubly_nested_int")
+ .build();
+
+ private static final Schema NESTED_ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("nested_str")
+ .addInt32Field("nested_int")
+ .addFloatField("nested_float")
+ .addRowField("nested_row", DOUBLY_NESTED_ROW_SCHEMA)
+ .build();
+ private static final Schema ROW_SCHEMA =
+ Schema.builder()
+ .addStringField("str")
+ .addBooleanField("bool")
+ .addInt32Field("int")
+ .addNullableInt32Field("nullable_int")
+ .addArrayField("arr_int", Schema.FieldType.INT32)
+ .addRowField("row", NESTED_ROW_SCHEMA)
+ .addNullableRowField("nullable_row", NESTED_ROW_SCHEMA)
+ .build();
+
+ @Test
+ public void testInvalidRowThrowsHelpfulError() {
+ String template = "foo {str}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Row invalidRow = Row.nullRow(Schema.builder().addNullableStringField("xyz").build());
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid row does not contain field 'str'.");
+
+ interpolator.interpolate(invalidRow, null, null, null);
+ }
+
+ @Test
+ public void testInvalidRowThrowsHelpfulErrorForNestedFields() {
+ String template = "foo {row.nested_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Schema nestedSchema = Schema.builder().addNullableStringField("xyz").build();
+ Row invalidRow =
+ Row.withSchema(Schema.builder().addNullableRowField("row", nestedSchema).build())
+ .addValue(Row.nullRow(nestedSchema))
+ .build();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid row does not contain field 'nested_int'.");
+
+ interpolator.interpolate(invalidRow, null, null, null);
+ }
+
+ @Test
+ public void testInvalidRowThrowsHelpfulErrorForDoublyNestedFields() {
+ String template = "foo {row.nested_row.doubly_nested_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Schema doublyNestedSchema = Schema.builder().addNullableStringField("xyz").build();
+ Schema nestedSchema =
+ Schema.builder().addNullableRowField("nested_row", doublyNestedSchema).build();
+ Row invalidRow =
+ Row.withSchema(Schema.builder().addNullableRowField("row", doublyNestedSchema).build())
+ .addValue(
+ Row.withSchema(nestedSchema).addValue(Row.nullRow(doublyNestedSchema)).build())
+ .build();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid row does not contain field 'doubly_nested_int'.");
+
+ interpolator.interpolate(invalidRow, null, null, null);
+ }
+
+ private static final Row ROW =
+ Row.withSchema(ROW_SCHEMA)
+ .addValue("str_value")
+ .addValue(true)
+ .addValue(123)
+ .addValue(null)
+ .addValue(Arrays.asList(1, 2, 3, 4, 5))
+ .addValue(
+ Row.withSchema(NESTED_ROW_SCHEMA)
+ .addValue("nested_str_value")
+ .addValue(456)
+ .addValue(1.234f)
+ .addValue(
+ Row.withSchema(DOUBLY_NESTED_ROW_SCHEMA)
+ .addValue("doubly_nested_str_value")
+ .addValue(789)
+ .build())
+ .build())
+ .addValue(null)
+ .build();
+
+ @Test
+ public void testTopLevelInterpolation() {
+ String template = "foo {str}, bar {bool}, baz {int}, xyz {nullable_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ String output = interpolator.interpolate(ROW, null, null, null);
+
+ assertEquals("foo str_value, bar true, baz 123, xyz ", output);
+ }
+
+ @Test
+ public void testNestedLevelInterpolation() {
+ String template = "foo {str}, bar {row.nested_str}, baz {row.nested_float}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ String output = interpolator.interpolate(ROW, null, null, null);
+
+ assertEquals("foo str_value, bar nested_str_value, baz 1.234", output);
+ }
+
+ @Test
+ public void testDoublyNestedInterpolation() {
+ String template =
+ "foo {str}, bar {row.nested_row.doubly_nested_str}, baz {row.nested_row.doubly_nested_int}";
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ String output = interpolator.interpolate(ROW, null, null, null);
+
+ assertEquals("foo str_value, bar doubly_nested_str_value, baz 789", output);
+ }
+
+ @Test
+ public void testInterpolateWindowingInformation() {
+ String template =
+ String.format(
+ "str: {str}, window: {%s}, pane: {%s}, year: {%s}, month: {%s}, day: {%s}",
+ RowStringInterpolator.WINDOW,
+ RowStringInterpolator.PANE_INDEX,
+ RowStringInterpolator.YYYY,
+ RowStringInterpolator.MM,
+ RowStringInterpolator.DD);
+
+ RowStringInterpolator interpolator = new RowStringInterpolator(template, ROW_SCHEMA);
+
+ Instant instant = new DateTime(2024, 8, 28, 12, 0).toInstant();
+
+ String output =
+ interpolator.interpolate(
+ ROW,
+ GlobalWindow.INSTANCE,
+ PaneInfo.createPane(false, false, PaneInfo.Timing.ON_TIME, 2, 0),
+ instant);
+ String expected =
+ String.format(
+ "str: str_value, window: %s, pane: 2, year: 2024, month: 8, day: 28",
+ GlobalWindow.INSTANCE);
+
+ assertEquals(expected, output);
+ }
+}