Skip to content

Commit

Permalink
Add Pubsub lite write metrics and DLQ support (#26482)
Browse files Browse the repository at this point in the history
* init

* fixup

* no data being processed. debugging

* added element counter

* only error counter and dlq. Removed element counter

* add unit test

* change name

* resolve conflicts
  • Loading branch information
nickuncaged1201 authored May 9, 2023
1 parent f70e9bf commit 86f56e9
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.beam.sdk.extensions.avro.schemas.utils.AvroUtils;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
Expand All @@ -38,16 +41,22 @@
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@AutoService(SchemaTransformProvider.class)
public class PubsubLiteWriteSchemaTransformProvider
Expand All @@ -57,13 +66,57 @@ public class PubsubLiteWriteSchemaTransformProvider
public static final String SUPPORTED_FORMATS_STR = "JSON,AVRO";
public static final Set<String> SUPPORTED_FORMATS =
Sets.newHashSet(SUPPORTED_FORMATS_STR.split(","));
public static final TupleTag<PubSubMessage> OUTPUT_TAG = new TupleTag<PubSubMessage>() {};
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
public static final Schema ERROR_SCHEMA =
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
private static final Logger LOG =
LoggerFactory.getLogger(PubsubLiteWriteSchemaTransformProvider.class);

@Override
protected @UnknownKeyFor @NonNull @Initialized Class<PubsubLiteWriteSchemaTransformConfiguration>
configurationClass() {
return PubsubLiteWriteSchemaTransformConfiguration.class;
}

public static class ErrorCounterFn extends DoFn<Row, PubSubMessage> {
private SerializableFunction<Row, byte[]> toBytesFn;
private Counter errorCounter;
private long errorsInBundle = 0L;

public ErrorCounterFn(String name, SerializableFunction<Row, byte[]> toBytesFn) {
this.toBytesFn = toBytesFn;
errorCounter = Metrics.counter(PubsubLiteWriteSchemaTransformProvider.class, name);
}

@ProcessElement
public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
try {
PubSubMessage message =
PubSubMessage.newBuilder()
.setData(ByteString.copyFrom(Objects.requireNonNull(toBytesFn.apply(row))))
.build();

receiver.get(OUTPUT_TAG).output(message);
} catch (Exception e) {
errorsInBundle += 1;
LOG.warn("Error while parsing the element", e);
receiver
.get(ERROR_TAG)
.output(
Row.withSchema(ERROR_SCHEMA)
.addValues(e.toString(), row.toString().getBytes(StandardCharsets.UTF_8))
.build());
}
}

@FinishBundle
public void finish() {
errorCounter.inc(errorsInBundle);
errorsInBundle = 0L;
}
}

@Override
public @UnknownKeyFor @NonNull @Initialized SchemaTransform from(
PubsubLiteWriteSchemaTransformConfiguration configuration) {
Expand Down Expand Up @@ -92,18 +145,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
configuration.getFormat().equals("JSON")
? JsonUtils.getRowToJsonBytesFunction(inputSchema)
: AvroUtils.getRowToAvroBytesFunction(inputSchema);
input
.get("input")
.apply(
"Map Rows to PubSubMessages",
MapElements.into(TypeDescriptor.of(PubSubMessage.class))
.via(
row ->
PubSubMessage.newBuilder()
.setData(
ByteString.copyFrom(
Objects.requireNonNull(toBytesFn.apply(row))))
.build()))

PCollectionTuple outputTuple =
input
.get("input")
.apply(
"Map Rows to PubSubMessages",
ParDo.of(new ErrorCounterFn("PubSubLite-write-error-counter", toBytesFn))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));

outputTuple
.get(OUTPUT_TAG)
.apply("Add UUIDs", PubsubLiteIO.addUuids())
.apply(
"Write to PS Lite",
Expand All @@ -117,7 +169,9 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
CloudRegionOrZone.parse(configuration.getLocation()))
.build())
.build()));
return PCollectionRowTuple.empty(input.getPipeline());

return PCollectionRowTuple.of(
"errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
}
};
}
Expand All @@ -138,7 +192,7 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
@Override
public @UnknownKeyFor @NonNull @Initialized List<@UnknownKeyFor @NonNull @Initialized String>
outputCollectionNames() {
return Collections.emptyList();
return Collections.singletonList("errors");
}

@AutoValue
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsublite.internal;

import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.protobuf.ByteString;
import java.util.Arrays;
import java.util.List;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteWriteSchemaTransformProvider.ErrorCounterFn;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.utils.JsonUtils;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class PubsubLiteWriteDlqTest {

private static final TupleTag<PubSubMessage> OUTPUT_TAG =
PubsubLiteWriteSchemaTransformProvider.OUTPUT_TAG;
private static final TupleTag<Row> ERROR_TAG = PubsubLiteWriteSchemaTransformProvider.ERROR_TAG;

private static final Schema BEAMSCHEMA =
Schema.of(Schema.Field.of("name", Schema.FieldType.STRING));
private static final Schema ERRORSCHEMA = PubsubLiteWriteSchemaTransformProvider.ERROR_SCHEMA;

private static final List<Row> ROWS =
Arrays.asList(
Row.withSchema(BEAMSCHEMA).withFieldValue("name", "a").build(),
Row.withSchema(BEAMSCHEMA).withFieldValue("name", "b").build(),
Row.withSchema(BEAMSCHEMA).withFieldValue("name", "c").build());

private static final List<PubSubMessage> MESSAGES =
Arrays.asList(
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"a\"}")).build(),
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"b\"}")).build(),
PubSubMessage.newBuilder().setData(ByteString.copyFromUtf8("{\"name\":\"c\"}")).build());

final SerializableFunction<Row, byte[]> valueMapper =
JsonUtils.getRowToJsonBytesFunction(BEAMSCHEMA);

@Rule public transient TestPipeline p = TestPipeline.create();

@Test
public void testPubsubLiteErrorFnSuccess() throws Exception {
PCollection<Row> input = p.apply(Create.of(ROWS));
PCollectionTuple output =
input.apply(
ParDo.of(new ErrorCounterFn("ErrorCounter", valueMapper))
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));

output.get(ERROR_TAG).setRowSchema(ERRORSCHEMA);

PAssert.that(output.get(OUTPUT_TAG)).containsInAnyOrder(MESSAGES);
p.run().waitUntilFinish();
}
}

0 comments on commit 86f56e9

Please sign in to comment.